diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8ddddbeee598f..5e077285ade55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2775,32 +2775,4 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } } - - test("SPARK-21791 ORC should support column names with dot") { - val orc = classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat].getCanonicalName - withTempDir { dir => - val path = new File(dir, "orc").getCanonicalPath - Seq(Some(1), None).toDF("col.dots").write.format(orc).save(path) - assert(spark.read.format(orc).load(path).collect().length == 2) - } - } - - test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { - withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") { - val e = intercept[AnalysisException] { - sql("CREATE TABLE spark_20728(a INT) USING ORC") - } - assert(e.message.contains("Hive built-in ORC data source must be used with Hive support")) - } - - withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") { - withTable("spark_20728") { - sql("CREATE TABLE spark_20728(a INT) USING ORC") - val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { - case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass - } - assert(fileFormat == Some(classOf[OrcFileFormat])) - } - } - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala similarity index 87% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index de6f0d67f1734..a5f6b68ee862e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -15,25 +15,32 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.orc +package org.apache.spark.sql.execution.datasources.orc import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ -import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.apache.spark.sql.{Column, DataFrame, QueryTest} +import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ /** - * A test suite that tests ORC filter API based filter pushdown optimization. + * A test suite that tests Apache ORC filter API based filter pushdown optimization. + * OrcFilterSuite and HiveOrcFilterSuite is logically duplicated to provide the same test coverage. + * The difference are the packages containing 'Predicate' and 'SearchArgument' classes. + * - OrcFilterSuite uses 'org.apache.orc.storage.ql.io.sarg' package. + * - HiveOrcFilterSuite uses 'org.apache.hadoop.hive.ql.io.sarg' package. */ -class OrcFilterSuite extends QueryTest with OrcTest { +class OrcFilterSuite extends OrcTest with SharedSQLContext { + private def checkFilterPredicate( df: DataFrame, predicate: Predicate, @@ -55,7 +62,7 @@ class OrcFilterSuite extends QueryTest with OrcTest { DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) assert(selectedFilters.nonEmpty, "No filter is pushed down") - val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters.toArray) + val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters) assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $selectedFilters") checker(maybeFilter.get) } @@ -99,7 +106,7 @@ class OrcFilterSuite extends QueryTest with OrcTest { DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) assert(selectedFilters.nonEmpty, "No filter is pushed down") - val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters.toArray) + val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters) assert(maybeFilter.isEmpty, s"Could generate filter predicate for $selectedFilters") } @@ -284,40 +291,27 @@ class OrcFilterSuite extends QueryTest with OrcTest { test("filter pushdown - combinations with logical operators") { withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df => - // Because `ExpressionTree` is not accessible at Hive 1.2.x, this should be checked - // in string form in order to check filter creation including logical operators - // such as `and`, `or` or `not`. So, this function uses `SearchArgument.toString()` - // to produce string expression and then compare it to given string expression below. - // This might have to be changed after Hive version is upgraded. checkFilterPredicate( '_1.isNotNull, - """leaf-0 = (IS_NULL _1) - |expr = (not leaf-0)""".stripMargin.trim + "leaf-0 = (IS_NULL _1), expr = (not leaf-0)" ) checkFilterPredicate( '_1 =!= 1, - """leaf-0 = (IS_NULL _1) - |leaf-1 = (EQUALS _1 1) - |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim + "leaf-0 = (IS_NULL _1), leaf-1 = (EQUALS _1 1), expr = (and (not leaf-0) (not leaf-1))" ) checkFilterPredicate( !('_1 < 4), - """leaf-0 = (IS_NULL _1) - |leaf-1 = (LESS_THAN _1 4) - |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim + "leaf-0 = (IS_NULL _1), leaf-1 = (LESS_THAN _1 4), expr = (and (not leaf-0) (not leaf-1))" ) checkFilterPredicate( '_1 < 2 || '_1 > 3, - """leaf-0 = (LESS_THAN _1 2) - |leaf-1 = (LESS_THAN_EQUALS _1 3) - |expr = (or leaf-0 (not leaf-1))""".stripMargin.trim + "leaf-0 = (LESS_THAN _1 2), leaf-1 = (LESS_THAN_EQUALS _1 3), " + + "expr = (or leaf-0 (not leaf-1))" ) checkFilterPredicate( '_1 < 2 && '_1 > 3, - """leaf-0 = (IS_NULL _1) - |leaf-1 = (LESS_THAN _1 2) - |leaf-2 = (LESS_THAN_EQUALS _1 3) - |expr = (and (not leaf-0) leaf-1 (not leaf-2))""".stripMargin.trim + "leaf-0 = (IS_NULL _1), leaf-1 = (LESS_THAN _1 2), leaf-2 = (LESS_THAN_EQUALS _1 3), " + + "expr = (and (not leaf-0) leaf-1 (not leaf-2))" ) } } @@ -344,4 +338,30 @@ class OrcFilterSuite extends QueryTest with OrcTest { checkNoFilterPredicate('_1.isNotNull) } } + + test("SPARK-12218 Converting conjunctions into ORC SearchArguments") { + import org.apache.spark.sql.sources._ + // The `LessThan` should be converted while the `StringContains` shouldn't + val schema = new StructType( + Array( + StructField("a", IntegerType, nullable = true), + StructField("b", StringType, nullable = true))) + assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") { + OrcFilters.createFilter(schema, Array( + LessThan("a", 10), + StringContains("b", "prefix") + )).get.toString + } + + // The `LessThan` should be converted while the whole inner `And` shouldn't + assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") { + OrcFilters.createFilter(schema, Array( + LessThan("a", 10), + Not(And( + GreaterThan("a", 1), + StringContains("b", "prefix") + )) + )).get.toString + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala similarity index 82% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala index d1ce3f1e2f058..d1911ea7f32a9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala @@ -15,19 +15,12 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.orc +package org.apache.spark.sql.execution.datasources.orc import java.io.File -import scala.reflect.ClassTag -import scala.reflect.runtime.universe.TypeTag - -import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.scalatest.BeforeAndAfterAll - import org.apache.spark.sql._ -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.util.Utils +import org.apache.spark.sql.test.SharedSQLContext // The data where the partitioning key exists only in the directory structure. case class OrcParData(intField: Int, stringField: String) @@ -35,28 +28,8 @@ case class OrcParData(intField: Int, stringField: String) // The data that also includes the partitioning key case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: String) -// TODO This test suite duplicates ParquetPartitionDiscoverySuite a lot -class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { - import spark._ - import spark.implicits._ - - val defaultPartitionName = ConfVars.DEFAULTPARTITIONNAME.defaultStrVal - - def withTempDir(f: File => Unit): Unit = { - val dir = Utils.createTempDir().getCanonicalFile - try f(dir) finally Utils.deleteRecursively(dir) - } - - def makeOrcFile[T <: Product: ClassTag: TypeTag]( - data: Seq[T], path: File): Unit = { - data.toDF().write.mode("overwrite").orc(path.getCanonicalPath) - } - - - def makeOrcFile[T <: Product: ClassTag: TypeTag]( - df: DataFrame, path: File): Unit = { - df.write.mode("overwrite").orc(path.getCanonicalPath) - } +abstract class OrcPartitionDiscoveryTest extends OrcTest { + val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__" protected def withTempTable(tableName: String)(f: => Unit): Unit = { try f finally spark.catalog.dropTempView(tableName) @@ -90,7 +63,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) } - read.orc(base.getCanonicalPath).createOrReplaceTempView("t") + spark.read.orc(base.getCanonicalPath).createOrReplaceTempView("t") withTempTable("t") { checkAnswer( @@ -137,7 +110,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) } - read.orc(base.getCanonicalPath).createOrReplaceTempView("t") + spark.read.orc(base.getCanonicalPath).createOrReplaceTempView("t") withTempTable("t") { checkAnswer( @@ -186,8 +159,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) } - read - .option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName) + spark.read + .option("hive.exec.default.partition.name", defaultPartitionName) .orc(base.getCanonicalPath) .createOrReplaceTempView("t") @@ -228,8 +201,8 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) } - read - .option(ConfVars.DEFAULTPARTITIONNAME.varname, defaultPartitionName) + spark.read + .option("hive.exec.default.partition.name", defaultPartitionName) .orc(base.getCanonicalPath) .createOrReplaceTempView("t") @@ -253,3 +226,4 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B } } +class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala similarity index 68% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 1ffaf30311037..e00e057a18cc6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -15,24 +15,27 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.orc +package org.apache.spark.sql.execution.datasources.orc +import java.io.File import java.nio.charset.StandardCharsets import java.sql.Timestamp import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.ql.io.orc.{OrcStruct, SparkOrcNewRecordReader} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.orc.{OrcConf, OrcFile} import org.apache.orc.OrcConf.COMPRESS -import org.scalatest.BeforeAndAfterAll +import org.apache.orc.mapred.OrcStruct +import org.apache.orc.mapreduce.OrcInputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} -import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.util.Utils @@ -57,7 +60,8 @@ case class Contact(name: String, phone: String) case class Person(name: String, age: Int, contacts: Seq[Contact]) -class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { +abstract class OrcQueryTest extends OrcTest { + import testImplicits._ test("Read/write All Types") { val data = (0 to 255).map { i => @@ -73,7 +77,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("Read/write binary data") { withOrcFile(BinaryData("test".getBytes(StandardCharsets.UTF_8)) :: Nil) { file => - val bytes = read.orc(file).head().getAs[Array[Byte]](0) + val bytes = spark.read.orc(file).head().getAs[Array[Byte]](0) assert(new String(bytes, StandardCharsets.UTF_8) === "test") } } @@ -91,7 +95,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withOrcFile(data) { file => checkAnswer( - read.orc(file), + spark.read.orc(file), data.toDF().collect()) } } @@ -172,7 +176,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withOrcFile(data) { file => checkAnswer( - read.orc(file), + spark.read.orc(file), Row(Seq.fill(5)(null): _*)) } } @@ -183,9 +187,13 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { spark.range(0, 10).write .option(COMPRESS.getAttribute, "ZLIB") .orc(file.getCanonicalPath) - val expectedCompressionKind = - OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression - assert("ZLIB" === expectedCompressionKind.name()) + + val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc")) + assert(maybeOrcFile.isDefined) + + val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) + val conf = OrcFile.readerOptions(new Configuration()) + assert("ZLIB" === OrcFile.createReader(orcFilePath, conf).getCompressionKind.name) } // `compression` overrides `orc.compress`. @@ -194,9 +202,13 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { .option("compression", "ZLIB") .option(COMPRESS.getAttribute, "SNAPPY") .orc(file.getCanonicalPath) - val expectedCompressionKind = - OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression - assert("ZLIB" === expectedCompressionKind.name()) + + val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc")) + assert(maybeOrcFile.isDefined) + + val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) + val conf = OrcFile.readerOptions(new Configuration()) + assert("ZLIB" === OrcFile.createReader(orcFilePath, conf).getCompressionKind.name) } } @@ -206,39 +218,39 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { spark.range(0, 10).write .option("compression", "ZLIB") .orc(file.getCanonicalPath) - val expectedCompressionKind = - OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression - assert("ZLIB" === expectedCompressionKind.name()) + + val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc")) + assert(maybeOrcFile.isDefined) + + val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) + val conf = OrcFile.readerOptions(new Configuration()) + assert("ZLIB" === OrcFile.createReader(orcFilePath, conf).getCompressionKind.name) } withTempPath { file => spark.range(0, 10).write .option("compression", "SNAPPY") .orc(file.getCanonicalPath) - val expectedCompressionKind = - OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression - assert("SNAPPY" === expectedCompressionKind.name()) + + val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".snappy.orc")) + assert(maybeOrcFile.isDefined) + + val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) + val conf = OrcFile.readerOptions(new Configuration()) + assert("SNAPPY" === OrcFile.createReader(orcFilePath, conf).getCompressionKind.name) } withTempPath { file => spark.range(0, 10).write .option("compression", "NONE") .orc(file.getCanonicalPath) - val expectedCompressionKind = - OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression - assert("NONE" === expectedCompressionKind.name()) - } - } - // Following codec is not supported in Hive 1.2.1, ignore it now - ignore("LZO compression options for writing to an ORC file not supported in Hive 1.2.1") { - withTempPath { file => - spark.range(0, 10).write - .option("compression", "LZO") - .orc(file.getCanonicalPath) - val expectedCompressionKind = - OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression - assert("LZO" === expectedCompressionKind.name()) + val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".orc")) + assert(maybeOrcFile.isDefined) + + val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) + val conf = OrcFile.readerOptions(new Configuration()) + assert("NONE" === OrcFile.createReader(orcFilePath, conf).getCompressionKind.name) } } @@ -256,22 +268,28 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("appending") { val data = (0 until 10).map(i => (i, i.toString)) - createDataFrame(data).toDF("c1", "c2").createOrReplaceTempView("tmp") + spark.createDataFrame(data).toDF("c1", "c2").createOrReplaceTempView("tmp") withOrcTable(data, "t") { sql("INSERT INTO TABLE t SELECT * FROM tmp") - checkAnswer(table("t"), (data ++ data).map(Row.fromTuple)) + checkAnswer(spark.table("t"), (data ++ data).map(Row.fromTuple)) } - sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false) + spark.sessionState.catalog.dropTable( + TableIdentifier("tmp"), + ignoreIfNotExists = true, + purge = false) } test("overwriting") { val data = (0 until 10).map(i => (i, i.toString)) - createDataFrame(data).toDF("c1", "c2").createOrReplaceTempView("tmp") + spark.createDataFrame(data).toDF("c1", "c2").createOrReplaceTempView("tmp") withOrcTable(data, "t") { sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") - checkAnswer(table("t"), data.map(Row.fromTuple)) + checkAnswer(spark.table("t"), data.map(Row.fromTuple)) } - sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false) + spark.sessionState.catalog.dropTable( + TableIdentifier("tmp"), + ignoreIfNotExists = true, + purge = false) } test("self-join") { @@ -334,60 +352,16 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withTempPath { dir => val path = dir.getCanonicalPath - spark.range(0, 10).select('id as "Acol").write.format("orc").save(path) - spark.read.format("orc").load(path).schema("Acol") + spark.range(0, 10).select('id as "Acol").write.orc(path) + spark.read.orc(path).schema("Acol") intercept[IllegalArgumentException] { - spark.read.format("orc").load(path).schema("acol") + spark.read.orc(path).schema("acol") } - checkAnswer(spark.read.format("orc").load(path).select("acol").sort("acol"), + checkAnswer(spark.read.orc(path).select("acol").sort("acol"), (0 until 10).map(Row(_))) } } - test("SPARK-8501: Avoids discovery schema from empty ORC files") { - withTempPath { dir => - val path = dir.getCanonicalPath - - withTable("empty_orc") { - withTempView("empty", "single") { - spark.sql( - s"""CREATE TABLE empty_orc(key INT, value STRING) - |STORED AS ORC - |LOCATION '${dir.toURI}' - """.stripMargin) - - val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) - emptyDF.createOrReplaceTempView("empty") - - // This creates 1 empty ORC file with Hive ORC SerDe. We are using this trick because - // Spark SQL ORC data source always avoids write empty ORC files. - spark.sql( - s"""INSERT INTO TABLE empty_orc - |SELECT key, value FROM empty - """.stripMargin) - - val errorMessage = intercept[AnalysisException] { - spark.read.orc(path) - }.getMessage - - assert(errorMessage.contains("Unable to infer schema for ORC")) - - val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1) - singleRowDF.createOrReplaceTempView("single") - - spark.sql( - s"""INSERT INTO TABLE empty_orc - |SELECT key, value FROM single - """.stripMargin) - - val df = spark.read.orc(path) - assert(df.schema === singleRowDF.schema.asNullable) - checkAnswer(df, singleRowDF) - } - } - } - } - test("SPARK-10623 Enable ORC PPD") { withTempPath { dir => withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { @@ -405,7 +379,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } // It needs to repartition data so that we can have several ORC files // in order to skip stripes in ORC. - createDataFrame(data).toDF("a", "b").repartition(10).write.orc(path) + spark.createDataFrame(data).toDF("a", "b").repartition(10).write.orc(path) val df = spark.read.orc(path) def checkPredicate(pred: Column, answer: Seq[Row]): Unit = { @@ -440,77 +414,6 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } - test("Verify the ORC conversion parameter: CONVERT_METASTORE_ORC") { - withTempView("single") { - val singleRowDF = Seq((0, "foo")).toDF("key", "value") - singleRowDF.createOrReplaceTempView("single") - - Seq("true", "false").foreach { orcConversion => - withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> orcConversion) { - withTable("dummy_orc") { - withTempPath { dir => - val path = dir.getCanonicalPath - spark.sql( - s""" - |CREATE TABLE dummy_orc(key INT, value STRING) - |STORED AS ORC - |LOCATION '${dir.toURI}' - """.stripMargin) - - spark.sql( - s""" - |INSERT INTO TABLE dummy_orc - |SELECT key, value FROM single - """.stripMargin) - - val df = spark.sql("SELECT * FROM dummy_orc WHERE key=0") - checkAnswer(df, singleRowDF) - - val queryExecution = df.queryExecution - if (orcConversion == "true") { - queryExecution.analyzed.collectFirst { - case _: LogicalRelation => () - }.getOrElse { - fail(s"Expecting the query plan to convert orc to data sources, " + - s"but got:\n$queryExecution") - } - } else { - queryExecution.analyzed.collectFirst { - case _: HiveTableRelation => () - }.getOrElse { - fail(s"Expecting no conversion from orc to data sources, " + - s"but got:\n$queryExecution") - } - } - } - } - } - } - } - } - - test("converted ORC table supports resolving mixed case field") { - withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") { - withTable("dummy_orc") { - withTempPath { dir => - val df = spark.range(5).selectExpr("id", "id as valueField", "id as partitionValue") - df.write - .partitionBy("partitionValue") - .mode("overwrite") - .orc(dir.getAbsolutePath) - - spark.sql(s""" - |create external table dummy_orc (id long, valueField long) - |partitioned by (partitionValue int) - |stored as orc - |location "${dir.toURI}"""".stripMargin) - spark.sql(s"msck repair table dummy_orc") - checkAnswer(spark.sql("select * from dummy_orc"), df) - } - } - } - } - test("SPARK-14962 Produce correct results on array type with isnotnull") { withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { val data = (0 until 10).map(i => Tuple1(Array(i))) @@ -544,7 +447,8 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withTempPath { file => // It needs to repartition data so that we can have several ORC files // in order to skip stripes in ORC. - createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath) + spark.createDataFrame(data).toDF("a").repartition(10) + .write.orc(file.getCanonicalPath) val df = spark.read.orc(file.getCanonicalPath).where("a == 2") val actual = stripSparkFilter(df).count() @@ -563,7 +467,8 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withTempPath { file => // It needs to repartition data so that we can have several ORC files // in order to skip stripes in ORC. - createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath) + spark.createDataFrame(data).toDF("a").repartition(10) + .write.orc(file.getCanonicalPath) val df = spark.read.orc(file.getCanonicalPath).where(s"a == '$timeString'") val actual = stripSparkFilter(df).count() @@ -596,14 +501,18 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("Empty schema does not read data from ORC file") { val data = Seq((1, 1), (2, 2)) withOrcFile(data) { path => - val requestedSchema = StructType(Nil) val conf = new Configuration() - val physicalSchema = OrcFileOperator.readSchema(Seq(path), Some(conf)).get - OrcFileFormat.setRequiredColumns(conf, physicalSchema, requestedSchema) - val maybeOrcReader = OrcFileOperator.getFileReader(path, Some(conf)) - assert(maybeOrcReader.isDefined) - val orcRecordReader = new SparkOrcNewRecordReader( - maybeOrcReader.get, conf, 0, maybeOrcReader.get.getContentLength) + conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, "") + conf.setBoolean("hive.io.file.read.all.columns", false) + + val orcRecordReader = { + val file = new File(path).listFiles().find(_.getName.endsWith(".snappy.orc")).head + val split = new FileSplit(new Path(file.toURI), 0, file.length, Array.empty[String]) + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + val oif = new OrcInputFormat[OrcStruct] + oif.createRecordReader(split, hadoopAttemptContext) + } val recordsIterator = new RecordReaderIterator[OrcStruct](orcRecordReader) try { @@ -614,27 +523,88 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } - test("read from multiple orc input paths") { - val path1 = Utils.createTempDir() - val path2 = Utils.createTempDir() - makeOrcFile((1 to 10).map(Tuple1.apply), path1) - makeOrcFile((1 to 10).map(Tuple1.apply), path2) - assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count()) - } + test("read from multiple orc input paths") { + val path1 = Utils.createTempDir() + val path2 = Utils.createTempDir() + makeOrcFile((1 to 10).map(Tuple1.apply), path1) + makeOrcFile((1 to 10).map(Tuple1.apply), path2) + val df = spark.read.orc(path1.getCanonicalPath, path2.getCanonicalPath) + assert(df.count() == 20) + } +} + +class OrcQuerySuite extends OrcQueryTest with SharedSQLContext { + import testImplicits._ + + test("LZO compression options for writing to an ORC file") { + withTempPath { file => + spark.range(0, 10).write + .option("compression", "LZO") + .orc(file.getCanonicalPath) + + val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".lzo.orc")) + assert(maybeOrcFile.isDefined) + + val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath) + val conf = OrcFile.readerOptions(new Configuration()) + assert("LZO" === OrcFile.createReader(orcFilePath, conf).getCompressionKind.name) + } + } + + test("Schema discovery on empty ORC files") { + // SPARK-8501 is fixed. + withTempPath { dir => + val path = dir.getCanonicalPath + + withTable("empty_orc") { + withTempView("empty", "single") { + spark.sql( + s"""CREATE TABLE empty_orc(key INT, value STRING) + |USING ORC + |LOCATION '${dir.toURI}' + """.stripMargin) + + val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) + emptyDF.createOrReplaceTempView("empty") + + // This creates 1 empty ORC file with ORC SerDe. We are using this trick because + // Spark SQL ORC data source always avoids write empty ORC files. + spark.sql( + s"""INSERT INTO TABLE empty_orc + |SELECT key, value FROM empty + """.stripMargin) + + val df = spark.read.orc(path) + assert(df.schema === emptyDF.schema.asNullable) + checkAnswer(df, emptyDF) + } + } + } + } + + test("SPARK-21791 ORC should support column names with dot") { + withTempDir { dir => + val path = new File(dir, "orc").getCanonicalPath + Seq(Some(1), None).toDF("col.dots").write.orc(path) + assert(spark.read.orc(path).collect().length == 2) + } + } test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { - Seq( - ("native", classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]), - ("hive", classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { case (i, format) => - - withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> i) { - withTable("spark_20728") { - sql("CREATE TABLE spark_20728(a INT) USING ORC") - val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { - case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass - } - assert(fileFormat == Some(format)) + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "hive") { + val e = intercept[AnalysisException] { + sql("CREATE TABLE spark_20728(a INT) USING ORC") + } + assert(e.message.contains("Hive built-in ORC data source must be used with Hive support")) + } + + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> "native") { + withTable("spark_20728") { + sql("CREATE TABLE spark_20728(a INT) USING ORC") + val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { + case l: LogicalRelation => l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass } + assert(fileFormat == Some(classOf[OrcFileFormat])) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala similarity index 63% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 2a086be57f517..6f5f2fd795f74 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.orc +package org.apache.spark.sql.execution.datasources.orc import java.io.File import java.util.Locale @@ -23,50 +23,30 @@ import java.util.Locale import org.apache.orc.OrcConf.COMPRESS import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.execution.datasources.orc.OrcOptions -import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.Row import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types._ +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils case class OrcData(intField: Int, stringField: String) -abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { - import spark._ +abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { + import testImplicits._ var orcTableDir: File = null var orcTableAsDir: File = null - override def beforeAll(): Unit = { + protected override def beforeAll(): Unit = { super.beforeAll() orcTableAsDir = Utils.createTempDir("orctests", "sparksql") - - // Hack: to prepare orc data files using hive external tables orcTableDir = Utils.createTempDir("orctests", "sparksql") - import org.apache.spark.sql.hive.test.TestHive.implicits._ sparkContext .makeRDD(1 to 10) .map(i => OrcData(i, s"part-$i")) .toDF() - .createOrReplaceTempView(s"orc_temp_table") - - sql( - s"""CREATE EXTERNAL TABLE normal_orc( - | intField INT, - | stringField STRING - |) - |STORED AS ORC - |LOCATION '${orcTableAsDir.toURI}' - """.stripMargin) - - sql( - s"""INSERT INTO TABLE normal_orc - |SELECT intField, stringField FROM orc_temp_table - """.stripMargin) + .createOrReplaceTempView("orc_temp_table") } test("create temporary orc table") { @@ -152,56 +132,13 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - val conf = sqlContext.sessionState.conf + val conf = spark.sessionState.conf val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf) assert(option.compressionCodec == "NONE") } - test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") { - val location = Utils.createTempDir() - val uri = location.toURI - try { - hiveClient.runSqlHive("USE default") - hiveClient.runSqlHive( - """ - |CREATE EXTERNAL TABLE hive_orc( - | a STRING, - | b CHAR(10), - | c VARCHAR(10), - | d ARRAY) - |STORED AS orc""".stripMargin) - // Hive throws an exception if I assign the location in the create table statement. - hiveClient.runSqlHive( - s"ALTER TABLE hive_orc SET LOCATION '$uri'") - hiveClient.runSqlHive( - """ - |INSERT INTO TABLE hive_orc - |SELECT 'a', 'b', 'c', ARRAY(CAST('d' AS CHAR(3))) - |FROM (SELECT 1) t""".stripMargin) - - // We create a different table in Spark using the same schema which points to - // the same location. - spark.sql( - s""" - |CREATE EXTERNAL TABLE spark_orc( - | a STRING, - | b CHAR(10), - | c VARCHAR(10), - | d ARRAY) - |STORED AS orc - |LOCATION '$uri'""".stripMargin) - val result = Row("a", "b ", "c", Seq("d ")) - checkAnswer(spark.table("hive_orc"), result) - checkAnswer(spark.table("spark_orc"), result) - } finally { - hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc") - hiveClient.runSqlHive("DROP TABLE IF EXISTS spark_orc") - Utils.deleteRecursively(location) - } - } - test("SPARK-21839: Add SQL config for ORC compression") { - val conf = sqlContext.sessionState.conf + val conf = spark.sessionState.conf // Test if the default of spark.sql.orc.compression.codec is snappy assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY") @@ -225,13 +162,28 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA } } -class OrcSourceSuite extends OrcSuite { - override def beforeAll(): Unit = { +class OrcSourceSuite extends OrcSuite with SharedSQLContext { + + protected override def beforeAll(): Unit = { super.beforeAll() + sql( + s"""CREATE TABLE normal_orc( + | intField INT, + | stringField STRING + |) + |USING ORC + |LOCATION '${orcTableAsDir.toURI}' + """.stripMargin) + + sql( + s"""INSERT INTO TABLE normal_orc + |SELECT intField, stringField FROM orc_temp_table + """.stripMargin) + spark.sql( s"""CREATE TEMPORARY VIEW normal_orc_source - |USING org.apache.spark.sql.hive.orc + |USING ORC |OPTIONS ( | PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}' |) @@ -239,43 +191,10 @@ class OrcSourceSuite extends OrcSuite { spark.sql( s"""CREATE TEMPORARY VIEW normal_orc_as_source - |USING org.apache.spark.sql.hive.orc + |USING ORC |OPTIONS ( | PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}' |) """.stripMargin) } - - test("SPARK-12218 Converting conjunctions into ORC SearchArguments") { - // The `LessThan` should be converted while the `StringContains` shouldn't - val schema = new StructType( - Array( - StructField("a", IntegerType, nullable = true), - StructField("b", StringType, nullable = true))) - assertResult( - """leaf-0 = (LESS_THAN a 10) - |expr = leaf-0 - """.stripMargin.trim - ) { - OrcFilters.createFilter(schema, Array( - LessThan("a", 10), - StringContains("b", "prefix") - )).get.toString - } - - // The `LessThan` should be converted while the whole inner `And` shouldn't - assertResult( - """leaf-0 = (LESS_THAN a 10) - |expr = leaf-0 - """.stripMargin.trim - ) { - OrcFilters.createFilter(schema, Array( - LessThan("a", 10), - Not(And( - GreaterThan("a", 1), - StringContains("b", "prefix") - )) - )).get.toString - } - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala similarity index 72% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala index a2f08c5ba72c6..d94cb850ed2a2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala @@ -15,20 +15,51 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.orc +package org.apache.spark.sql.execution.datasources.orc import java.io.File import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag +import org.scalatest.BeforeAndAfterAll + import org.apache.spark.sql._ -import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION import org.apache.spark.sql.test.SQLTestUtils -private[sql] trait OrcTest extends SQLTestUtils with TestHiveSingleton { +/** + * OrcTest + * -> OrcSuite + * -> OrcSourceSuite + * -> HiveOrcSourceSuite + * -> OrcQueryTests + * -> OrcQuerySuite + * -> HiveOrcQuerySuite + * -> OrcPartitionDiscoveryTest + * -> OrcPartitionDiscoverySuite + * -> HiveOrcPartitionDiscoverySuite + * -> OrcFilterSuite + * -> HiveOrcFilterSuite + */ +abstract class OrcTest extends QueryTest with SQLTestUtils with BeforeAndAfterAll { import testImplicits._ + val orcImp: String = "native" + + private var originalConfORCImplementation = "native" + + protected override def beforeAll(): Unit = { + super.beforeAll() + originalConfORCImplementation = conf.getConf(ORC_IMPLEMENTATION) + conf.setConf(ORC_IMPLEMENTATION, orcImp) + } + + protected override def afterAll(): Unit = { + conf.setConf(ORC_IMPLEMENTATION, originalConfORCImplementation) + super.afterAll() + } + /** * Writes `data` to a Orc file, which is then passed to `f` and will be deleted after `f` * returns. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala new file mode 100644 index 0000000000000..283037caf4a9b --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.nio.charset.StandardCharsets +import java.sql.{Date, Timestamp} + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} + +import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.orc.OrcTest +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.types._ + +/** + * A test suite that tests Hive ORC filter API based filter pushdown optimization. + */ +class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { + + override val orcImp: String = "hive" + + private def checkFilterPredicate( + df: DataFrame, + predicate: Predicate, + checker: (SearchArgument) => Unit): Unit = { + val output = predicate.collect { case a: Attribute => a }.distinct + val query = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + + var maybeRelation: Option[HadoopFsRelation] = None + val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { + case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) => + maybeRelation = Some(orcRelation) + filters + }.flatten.reduceLeftOption(_ && _) + assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") + + val (_, selectedFilters, _) = + DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) + assert(selectedFilters.nonEmpty, "No filter is pushed down") + + val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters.toArray) + assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $selectedFilters") + checker(maybeFilter.get) + } + + private def checkFilterPredicate + (predicate: Predicate, filterOperator: PredicateLeaf.Operator) + (implicit df: DataFrame): Unit = { + def checkComparisonOperator(filter: SearchArgument) = { + val operator = filter.getLeaves.asScala + assert(operator.map(_.getOperator).contains(filterOperator)) + } + checkFilterPredicate(df, predicate, checkComparisonOperator) + } + + private def checkFilterPredicate + (predicate: Predicate, stringExpr: String) + (implicit df: DataFrame): Unit = { + def checkLogicalOperator(filter: SearchArgument) = { + assert(filter.toString == stringExpr) + } + checkFilterPredicate(df, predicate, checkLogicalOperator) + } + + private def checkNoFilterPredicate + (predicate: Predicate) + (implicit df: DataFrame): Unit = { + val output = predicate.collect { case a: Attribute => a }.distinct + val query = df + .select(output.map(e => Column(e)): _*) + .where(Column(predicate)) + + var maybeRelation: Option[HadoopFsRelation] = None + val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { + case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) => + maybeRelation = Some(orcRelation) + filters + }.flatten.reduceLeftOption(_ && _) + assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") + + val (_, selectedFilters, _) = + DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) + assert(selectedFilters.nonEmpty, "No filter is pushed down") + + val maybeFilter = OrcFilters.createFilter(query.schema, selectedFilters.toArray) + assert(maybeFilter.isEmpty, s"Could generate filter predicate for $selectedFilters") + } + + test("filter pushdown - integer") { + withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df => + checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL) + + checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + + checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN) + + checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN) + } + } + + test("filter pushdown - long") { + withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit df => + checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL) + + checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + + checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN) + + checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN) + } + } + + test("filter pushdown - float") { + withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit df => + checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL) + + checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + + checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN) + + checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN) + } + } + + test("filter pushdown - double") { + withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit df => + checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL) + + checkFilterPredicate('_1 === 1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate('_1 <=> 1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + + checkFilterPredicate('_1 < 2, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate('_1 > 3, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 <= 1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 >= 4, PredicateLeaf.Operator.LESS_THAN) + + checkFilterPredicate(Literal(1) === '_1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate(Literal(1) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + checkFilterPredicate(Literal(2) > '_1, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate(Literal(3) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(1) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(4) <= '_1, PredicateLeaf.Operator.LESS_THAN) + } + } + + test("filter pushdown - string") { + withOrcDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df => + checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL) + + checkFilterPredicate('_1 === "1", PredicateLeaf.Operator.EQUALS) + checkFilterPredicate('_1 <=> "1", PredicateLeaf.Operator.NULL_SAFE_EQUALS) + + checkFilterPredicate('_1 < "2", PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate('_1 > "3", PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 <= "1", PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 >= "4", PredicateLeaf.Operator.LESS_THAN) + + checkFilterPredicate(Literal("1") === '_1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate(Literal("1") <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + checkFilterPredicate(Literal("2") > '_1, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate(Literal("3") < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal("1") >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal("4") <= '_1, PredicateLeaf.Operator.LESS_THAN) + } + } + + test("filter pushdown - boolean") { + withOrcDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df => + checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL) + + checkFilterPredicate('_1 === true, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate('_1 <=> true, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + + checkFilterPredicate('_1 < true, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate('_1 > false, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 <= false, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 >= false, PredicateLeaf.Operator.LESS_THAN) + + checkFilterPredicate(Literal(false) === '_1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate(Literal(false) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + checkFilterPredicate(Literal(false) > '_1, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate(Literal(true) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(true) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(true) <= '_1, PredicateLeaf.Operator.LESS_THAN) + } + } + + test("filter pushdown - decimal") { + withOrcDataFrame((1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { implicit df => + checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL) + + checkFilterPredicate('_1 === BigDecimal.valueOf(1), PredicateLeaf.Operator.EQUALS) + checkFilterPredicate('_1 <=> BigDecimal.valueOf(1), PredicateLeaf.Operator.NULL_SAFE_EQUALS) + + checkFilterPredicate('_1 < BigDecimal.valueOf(2), PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate('_1 > BigDecimal.valueOf(3), PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 <= BigDecimal.valueOf(1), PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 >= BigDecimal.valueOf(4), PredicateLeaf.Operator.LESS_THAN) + + checkFilterPredicate( + Literal(BigDecimal.valueOf(1)) === '_1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate( + Literal(BigDecimal.valueOf(1)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + checkFilterPredicate( + Literal(BigDecimal.valueOf(2)) > '_1, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate( + Literal(BigDecimal.valueOf(3)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate( + Literal(BigDecimal.valueOf(1)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate( + Literal(BigDecimal.valueOf(4)) <= '_1, PredicateLeaf.Operator.LESS_THAN) + } + } + + test("filter pushdown - timestamp") { + val timeString = "2015-08-20 14:57:00" + val timestamps = (1 to 4).map { i => + val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600 + new Timestamp(milliseconds) + } + withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df => + checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL) + + checkFilterPredicate('_1 === timestamps(0), PredicateLeaf.Operator.EQUALS) + checkFilterPredicate('_1 <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS) + + checkFilterPredicate('_1 < timestamps(1), PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate('_1 > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate('_1 >= timestamps(3), PredicateLeaf.Operator.LESS_THAN) + + checkFilterPredicate(Literal(timestamps(0)) === '_1, PredicateLeaf.Operator.EQUALS) + checkFilterPredicate(Literal(timestamps(0)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS) + checkFilterPredicate(Literal(timestamps(1)) > '_1, PredicateLeaf.Operator.LESS_THAN) + checkFilterPredicate(Literal(timestamps(2)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(timestamps(0)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS) + checkFilterPredicate(Literal(timestamps(3)) <= '_1, PredicateLeaf.Operator.LESS_THAN) + } + } + + test("filter pushdown - combinations with logical operators") { + withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df => + // Because `ExpressionTree` is not accessible at Hive 1.2.x, this should be checked + // in string form in order to check filter creation including logical operators + // such as `and`, `or` or `not`. So, this function uses `SearchArgument.toString()` + // to produce string expression and then compare it to given string expression below. + // This might have to be changed after Hive version is upgraded. + checkFilterPredicate( + '_1.isNotNull, + """leaf-0 = (IS_NULL _1) + |expr = (not leaf-0)""".stripMargin.trim + ) + checkFilterPredicate( + '_1 =!= 1, + """leaf-0 = (IS_NULL _1) + |leaf-1 = (EQUALS _1 1) + |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim + ) + checkFilterPredicate( + !('_1 < 4), + """leaf-0 = (IS_NULL _1) + |leaf-1 = (LESS_THAN _1 4) + |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim + ) + checkFilterPredicate( + '_1 < 2 || '_1 > 3, + """leaf-0 = (LESS_THAN _1 2) + |leaf-1 = (LESS_THAN_EQUALS _1 3) + |expr = (or leaf-0 (not leaf-1))""".stripMargin.trim + ) + checkFilterPredicate( + '_1 < 2 && '_1 > 3, + """leaf-0 = (IS_NULL _1) + |leaf-1 = (LESS_THAN _1 2) + |leaf-2 = (LESS_THAN_EQUALS _1 3) + |expr = (and (not leaf-0) leaf-1 (not leaf-2))""".stripMargin.trim + ) + } + } + + test("no filter pushdown - non-supported types") { + implicit class IntToBinary(int: Int) { + def b: Array[Byte] = int.toString.getBytes(StandardCharsets.UTF_8) + } + // ArrayType + withOrcDataFrame((1 to 4).map(i => Tuple1(Array(i)))) { implicit df => + checkNoFilterPredicate('_1.isNull) + } + // BinaryType + withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df => + checkNoFilterPredicate('_1 <=> 1.b) + } + // DateType + val stringDate = "2015-01-01" + withOrcDataFrame(Seq(Tuple1(Date.valueOf(stringDate)))) { implicit df => + checkNoFilterPredicate('_1 === Date.valueOf(stringDate)) + } + // MapType + withOrcDataFrame((1 to 4).map(i => Tuple1(Map(i -> i)))) { implicit df => + checkNoFilterPredicate('_1.isNotNull) + } + } + + test("SPARK-12218 Converting conjunctions into ORC SearchArguments") { + import org.apache.spark.sql.sources._ + // The `LessThan` should be converted while the `StringContains` shouldn't + val schema = new StructType( + Array( + StructField("a", IntegerType, nullable = true), + StructField("b", StringType, nullable = true))) + assertResult( + """leaf-0 = (LESS_THAN a 10) + |expr = leaf-0 + """.stripMargin.trim + ) { + OrcFilters.createFilter(schema, Array( + LessThan("a", 10), + StringContains("b", "prefix") + )).get.toString + } + + // The `LessThan` should be converted while the whole inner `And` shouldn't + assertResult( + """leaf-0 = (LESS_THAN a 10) + |expr = leaf-0 + """.stripMargin.trim + ) { + OrcFilters.createFilter(schema, Array( + LessThan("a", 10), + Not(And( + GreaterThan("a", 1), + StringContains("b", "prefix") + )) + )).get.toString + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcPartitionDiscoverySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcPartitionDiscoverySuite.scala new file mode 100644 index 0000000000000..ab9b492f347c6 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcPartitionDiscoverySuite.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import org.apache.spark.sql.execution.datasources.orc.OrcPartitionDiscoveryTest +import org.apache.spark.sql.hive.test.TestHiveSingleton + +class HiveOrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with TestHiveSingleton { + override val orcImp: String = "hive" +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala new file mode 100644 index 0000000000000..7244c369bd3f4 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.orc.OrcQueryTest +import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf + +class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { + import testImplicits._ + + override val orcImp: String = "hive" + + test("SPARK-8501: Avoids discovery schema from empty ORC files") { + withTempPath { dir => + val path = dir.getCanonicalPath + + withTable("empty_orc") { + withTempView("empty", "single") { + spark.sql( + s"""CREATE TABLE empty_orc(key INT, value STRING) + |STORED AS ORC + |LOCATION '${dir.toURI}' + """.stripMargin) + + val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) + emptyDF.createOrReplaceTempView("empty") + + // This creates 1 empty ORC file with Hive ORC SerDe. We are using this trick because + // Spark SQL ORC data source always avoids write empty ORC files. + spark.sql( + s"""INSERT INTO TABLE empty_orc + |SELECT key, value FROM empty + """.stripMargin) + + val errorMessage = intercept[AnalysisException] { + spark.read.orc(path) + }.getMessage + + assert(errorMessage.contains("Unable to infer schema for ORC")) + + val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1) + singleRowDF.createOrReplaceTempView("single") + + spark.sql( + s"""INSERT INTO TABLE empty_orc + |SELECT key, value FROM single + """.stripMargin) + + val df = spark.read.orc(path) + assert(df.schema === singleRowDF.schema.asNullable) + checkAnswer(df, singleRowDF) + } + } + } + } + + test("Verify the ORC conversion parameter: CONVERT_METASTORE_ORC") { + withTempView("single") { + val singleRowDF = Seq((0, "foo")).toDF("key", "value") + singleRowDF.createOrReplaceTempView("single") + + Seq("true", "false").foreach { orcConversion => + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> orcConversion) { + withTable("dummy_orc") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.sql( + s""" + |CREATE TABLE dummy_orc(key INT, value STRING) + |STORED AS ORC + |LOCATION '${dir.toURI}' + """.stripMargin) + + spark.sql( + s""" + |INSERT INTO TABLE dummy_orc + |SELECT key, value FROM single + """.stripMargin) + + val df = spark.sql("SELECT * FROM dummy_orc WHERE key=0") + checkAnswer(df, singleRowDF) + + val queryExecution = df.queryExecution + if (orcConversion == "true") { + queryExecution.analyzed.collectFirst { + case _: LogicalRelation => () + }.getOrElse { + fail(s"Expecting the query plan to convert orc to data sources, " + + s"but got:\n$queryExecution") + } + } else { + queryExecution.analyzed.collectFirst { + case _: HiveTableRelation => () + }.getOrElse { + fail(s"Expecting no conversion from orc to data sources, " + + s"but got:\n$queryExecution") + } + } + } + } + } + } + } + } + + test("converted ORC table supports resolving mixed case field") { + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") { + withTable("dummy_orc") { + withTempPath { dir => + val df = spark.range(5).selectExpr("id", "id as valueField", "id as partitionValue") + df.write + .partitionBy("partitionValue") + .mode("overwrite") + .orc(dir.getAbsolutePath) + + spark.sql(s""" + |create external table dummy_orc (id long, valueField long) + |partitioned by (partitionValue int) + |stored as orc + |location "${dir.toURI}"""".stripMargin) + spark.sql(s"msck repair table dummy_orc") + checkAnswer(spark.sql("select * from dummy_orc"), df) + } + } + } + } + + test("SPARK-20728 Make ORCFileFormat configurable between sql/hive and sql/core") { + Seq( + ("native", classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat]), + ("hive", classOf[org.apache.spark.sql.hive.orc.OrcFileFormat])).foreach { + case (orcImpl, format) => + withSQLConf(SQLConf.ORC_IMPLEMENTATION.key -> orcImpl) { + withTable("spark_20728") { + sql("CREATE TABLE spark_20728(a INT) USING ORC") + val fileFormat = sql("SELECT * FROM spark_20728").queryExecution.analyzed.collectFirst { + case l: LogicalRelation => + l.relation.asInstanceOf[HadoopFsRelation].fileFormat.getClass + } + assert(fileFormat == Some(format)) + } + } + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala new file mode 100644 index 0000000000000..17b7d8cfe127e --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.io.File + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.datasources.orc.OrcSuite +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.util.Utils + +class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton { + + override val orcImp: String = "hive" + + override def beforeAll(): Unit = { + super.beforeAll() + + sql( + s"""CREATE EXTERNAL TABLE normal_orc( + | intField INT, + | stringField STRING + |) + |STORED AS ORC + |LOCATION '${orcTableAsDir.toURI}' + """.stripMargin) + + sql( + s"""INSERT INTO TABLE normal_orc + |SELECT intField, stringField FROM orc_temp_table + """.stripMargin) + + spark.sql( + s"""CREATE TEMPORARY VIEW normal_orc_source + |USING org.apache.spark.sql.hive.orc + |OPTIONS ( + | PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}' + |) + """.stripMargin) + + spark.sql( + s"""CREATE TEMPORARY VIEW normal_orc_as_source + |USING org.apache.spark.sql.hive.orc + |OPTIONS ( + | PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}' + |) + """.stripMargin) + } + + test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") { + val location = Utils.createTempDir() + val uri = location.toURI + try { + hiveClient.runSqlHive("USE default") + hiveClient.runSqlHive( + """ + |CREATE EXTERNAL TABLE hive_orc( + | a STRING, + | b CHAR(10), + | c VARCHAR(10), + | d ARRAY) + |STORED AS orc""".stripMargin) + // Hive throws an exception if I assign the location in the create table statement. + hiveClient.runSqlHive( + s"ALTER TABLE hive_orc SET LOCATION '$uri'") + hiveClient.runSqlHive( + """ + |INSERT INTO TABLE hive_orc + |SELECT 'a', 'b', 'c', ARRAY(CAST('d' AS CHAR(3))) + |FROM (SELECT 1) t""".stripMargin) + + // We create a different table in Spark using the same schema which points to + // the same location. + spark.sql( + s""" + |CREATE EXTERNAL TABLE spark_orc( + | a STRING, + | b CHAR(10), + | c VARCHAR(10), + | d ARRAY) + |STORED AS orc + |LOCATION '$uri'""".stripMargin) + val result = Row("a", "b ", "c", Seq("d ")) + checkAnswer(spark.table("hive_orc"), result) + checkAnswer(spark.table("spark_orc"), result) + } finally { + hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc") + hiveClient.runSqlHive("DROP TABLE IF EXISTS spark_orc") + Utils.deleteRecursively(location) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index ba0a7605da71c..f87162f94c01a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -30,7 +30,8 @@ import org.apache.spark.sql.types._ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { import testImplicits._ - override val dataSourceName: String = classOf[OrcFileFormat].getCanonicalName + override val dataSourceName: String = + classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat].getCanonicalName // ORC does not play well with NullType and UDT. override protected def supportsDataType(dataType: DataType): Boolean = dataType match { @@ -116,3 +117,8 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { } } } + +class HiveOrcHadoopFsRelationSuite extends OrcHadoopFsRelationSuite { + override val dataSourceName: String = + classOf[org.apache.spark.sql.hive.orc.OrcFileFormat].getCanonicalName +}