/
OrcTest.scala
177 lines (153 loc) · 7.07 KB
/
OrcTest.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.orc
import java.io.File
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute, Predicate}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.FileBasedDataSourceTest
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION
import org.apache.spark.util.ArrayImplicits._
/**
* OrcTest
* -> OrcSuite
* -> OrcSourceSuite
* -> HiveOrcSourceSuite
* -> OrcQueryTests
* -> OrcQuerySuite
* -> HiveOrcQuerySuite
* -> OrcPartitionDiscoveryTest
* -> OrcPartitionDiscoverySuite
* -> HiveOrcPartitionDiscoverySuite
* -> OrcFilterSuite
*/
trait OrcTest extends QueryTest with FileBasedDataSourceTest with BeforeAndAfterAll {
val orcImp: String = "native"
private var originalConfORCImplementation = "native"
override protected val dataSourceName: String = "orc"
override protected val vectorizedReaderEnabledKey: String =
SQLConf.ORC_VECTORIZED_READER_ENABLED.key
override protected val vectorizedReaderNestedEnabledKey: String =
SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key
protected override def beforeAll(): Unit = {
super.beforeAll()
originalConfORCImplementation = spark.conf.get(ORC_IMPLEMENTATION)
spark.conf.set(ORC_IMPLEMENTATION.key, orcImp)
}
protected override def afterAll(): Unit = {
spark.conf.set(ORC_IMPLEMENTATION.key, originalConfORCImplementation)
super.afterAll()
}
/**
* Writes `data` to a Orc file, which is then passed to `f` and will be deleted after `f`
* returns.
*/
protected def withOrcFile[T <: Product: ClassTag: TypeTag]
(data: Seq[T])
(f: String => Unit): Unit = withDataSourceFile(data)(f)
/**
* Writes `data` to a Orc file and reads it back as a `DataFrame`,
* which is then passed to `f`. The Orc file will be deleted after `f` returns.
*/
protected def withOrcDataFrame[T <: Product: ClassTag: TypeTag]
(data: Seq[T], testVectorized: Boolean = true)
(f: DataFrame => Unit): Unit = withDataSourceDataFrame(data, testVectorized)(f)
/**
* Writes `data` to a Orc file, reads it back as a `DataFrame` and registers it as a
* temporary table named `tableName`, then call `f`. The temporary table together with the
* Orc file will be dropped/deleted after `f` returns.
*/
protected def withOrcTable[T <: Product: ClassTag: TypeTag]
(data: Seq[T], tableName: String, testVectorized: Boolean = true)
(f: => Unit): Unit = withDataSourceTable(data, tableName, testVectorized)(f)
protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
data: Seq[T], path: File): Unit = makeDataSourceFile(data, path)
protected def makeOrcFile[T <: Product: ClassTag: TypeTag](
df: DataFrame, path: File): Unit = makeDataSourceFile(df, path)
protected def checkPredicatePushDown(df: DataFrame, numRows: Int, predicate: String): Unit = {
withTempPath { file =>
// It needs to repartition data so that we can have several ORC files
// in order to skip stripes in ORC.
df.repartition(numRows).write.orc(file.getCanonicalPath)
val actual = stripSparkFilter(spark.read.orc(file.getCanonicalPath).where(predicate)).count()
assert(actual < numRows)
}
}
protected def checkNoFilterPredicate
(predicate: Predicate, noneSupported: Boolean = false)
(implicit df: DataFrame): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
query.queryExecution.optimizedPlan match {
case PhysicalOperation(_, filters, DataSourceV2ScanRelation(_, o: OrcScan, _, _, _)) =>
assert(filters.nonEmpty, "No filter is analyzed from the given query")
if (noneSupported) {
assert(o.pushedFilters.isEmpty, "Unsupported filters should not show in pushed filters")
} else {
assert(o.pushedFilters.nonEmpty, "No filter is pushed down")
val maybeFilter = OrcFilters
.createFilter(query.schema, o.pushedFilters.toImmutableArraySeq)
assert(maybeFilter.isEmpty, s"Couldn't generate filter predicate for " +
s"${o.pushedFilters.mkString("pushedFilters(", ", ", ")")}")
}
case _ => assert(false, "Can not match OrcTable in the query.")
}
}
protected def readResourceOrcFile(name: String): DataFrame = {
val url = Thread.currentThread().getContextClassLoader.getResource(name)
// Copy to avoid URISyntaxException when `sql/hive` accesses the resources in `sql/core`
val file = File.createTempFile("orc-test", ".orc")
file.deleteOnExit();
FileUtils.copyURLToFile(url, file)
spark.read.orc(file.getAbsolutePath)
}
def withAllNativeOrcReaders(code: => Unit): Unit = {
// test the row-based reader
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false")(code)
// test the vectorized reader
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "true")(code)
}
/**
* Takes a sequence of products `data` to generate multi-level nested
* dataframes as new test data. It tests both non-nested and nested dataframes
* which are written and read back with Orc datasource.
*
* This is different from [[withOrcDataFrame]] which does not
* test nested cases.
*/
protected def withNestedOrcDataFrame[T <: Product: ClassTag: TypeTag](data: Seq[T])
(runTest: (DataFrame, String, Any => Any) => Unit): Unit =
withNestedOrcDataFrame(spark.createDataFrame(data))(runTest)
protected def withNestedOrcDataFrame(inputDF: DataFrame)
(runTest: (DataFrame, String, Any => Any) => Unit): Unit = {
withNestedDataFrame(inputDF).foreach { case (newDF, colName, resultFun) =>
withTempPath { file =>
newDF.write.format(dataSourceName).save(file.getCanonicalPath)
readFile(file.getCanonicalPath, true) { df => runTest(df, colName, resultFun) }
}
}
}
}