-
Notifications
You must be signed in to change notification settings - Fork 28k
/
ParquetSchemaPruningSuite.scala
370 lines (320 loc) · 14.8 KB
/
ParquetSchemaPruningSuite.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
import org.scalactic.Equality
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.SchemaPruningTest
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType
class ParquetSchemaPruningSuite
extends QueryTest
with ParquetTest
with SchemaPruningTest
with SharedSQLContext {
case class FullName(first: String, middle: String, last: String)
case class Company(name: String, address: String)
case class Employer(id: Int, company: Company)
case class Contact(
id: Int,
name: FullName,
address: String,
pets: Int,
friends: Array[FullName] = Array.empty,
relatives: Map[String, FullName] = Map.empty,
employer: Employer = null)
val janeDoe = FullName("Jane", "X.", "Doe")
val johnDoe = FullName("John", "Y.", "Doe")
val susanSmith = FullName("Susan", "Z.", "Smith")
val employer = Employer(0, Company("abc", "123 Business Street"))
val employerWithNullCompany = Employer(1, null)
private val contacts =
Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
relatives = Map("brother" -> johnDoe), employer = employer) ::
Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe),
employer = employerWithNullCompany) :: Nil
case class Name(first: String, last: String)
case class BriefContact(id: Int, name: Name, address: String)
private val briefContacts =
BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
case class ContactWithDataPartitionColumn(
id: Int,
name: FullName,
address: String,
pets: Int,
friends: Array[FullName] = Array(),
relatives: Map[String, FullName] = Map(),
employer: Employer = null,
p: Int)
case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int)
private val contactsWithDataPartitionColumn =
contacts.map { case Contact(id, name, address, pets, friends, relatives, employer) =>
ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, employer, 1) }
private val briefContactsWithDataPartitionColumn =
briefContacts.map { case BriefContact(id, name, address) =>
BriefContactWithDataPartitionColumn(id, name, address, 2) }
testSchemaPruning("select a single complex field") {
val query = sql("select name.middle from contacts")
checkScan(query, "struct<name:struct<middle:string>>")
checkAnswer(query.orderBy("id"), Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil)
}
testSchemaPruning("select a single complex field and its parent struct") {
val query = sql("select name.middle, name from contacts")
checkScan(query, "struct<name:struct<first:string,middle:string,last:string>>")
checkAnswer(query.orderBy("id"),
Row("X.", Row("Jane", "X.", "Doe")) ::
Row("Y.", Row("John", "Y.", "Doe")) ::
Row(null, Row("Janet", null, "Jones")) ::
Row(null, Row("Jim", null, "Jones")) ::
Nil)
}
testSchemaPruning("select a single complex field array and its parent struct array") {
val query = sql("select friends.middle, friends from contacts where p=1")
checkScan(query,
"struct<friends:array<struct<first:string,middle:string,last:string>>>")
checkAnswer(query.orderBy("id"),
Row(Array("Z."), Array(Row("Susan", "Z.", "Smith"))) ::
Row(Array.empty[String], Array.empty[Row]) ::
Nil)
}
testSchemaPruning("select a single complex field from a map entry and its parent map entry") {
val query =
sql("select relatives[\"brother\"].middle, relatives[\"brother\"] from contacts where p=1")
checkScan(query,
"struct<relatives:map<string,struct<first:string,middle:string,last:string>>>")
checkAnswer(query.orderBy("id"),
Row("Y.", Row("John", "Y.", "Doe")) ::
Row(null, null) ::
Nil)
}
testSchemaPruning("select a single complex field and the partition column") {
val query = sql("select name.middle, p from contacts")
checkScan(query, "struct<name:struct<middle:string>>")
checkAnswer(query.orderBy("id"),
Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil)
}
ignore("partial schema intersection - select missing subfield") {
val query = sql("select name.middle, address from contacts where p=2")
checkScan(query, "struct<name:struct<middle:string>,address:string>")
checkAnswer(query.orderBy("id"),
Row(null, "567 Maple Drive") ::
Row(null, "6242 Ash Street") :: Nil)
}
testSchemaPruning("no unnecessary schema pruning") {
val query =
sql("select id, name.last, name.middle, name.first, relatives[''].last, " +
"relatives[''].middle, relatives[''].first, friends[0].last, friends[0].middle, " +
"friends[0].first, pets, address from contacts where p=2")
// We've selected every field in the schema. Therefore, no schema pruning should be performed.
// We check this by asserting that the scanned schema of the query is identical to the schema
// of the contacts relation, even though the fields are selected in different orders.
checkScan(query,
"struct<id:int,name:struct<first:string,middle:string,last:string>,address:string,pets:int," +
"friends:array<struct<first:string,middle:string,last:string>>," +
"relatives:map<string,struct<first:string,middle:string,last:string>>>")
checkAnswer(query.orderBy("id"),
Row(2, "Jones", null, "Janet", null, null, null, null, null, null, null, "567 Maple Drive") ::
Row(3, "Jones", null, "Jim", null, null, null, null, null, null, null, "6242 Ash Street") ::
Nil)
}
testSchemaPruning("empty schema intersection") {
val query = sql("select name.middle from contacts where p=2")
checkScan(query, "struct<name:struct<middle:string>>")
checkAnswer(query.orderBy("id"),
Row(null) :: Row(null) :: Nil)
}
testSchemaPruning("select a single complex field and in where clause") {
val query1 = sql("select name.first from contacts where name.first = 'Jane'")
checkScan(query1, "struct<name:struct<first:string>>")
checkAnswer(query1, Row("Jane") :: Nil)
val query2 = sql("select name.first, name.last from contacts where name.first = 'Jane'")
checkScan(query2, "struct<name:struct<first:string,last:string>>")
checkAnswer(query2, Row("Jane", "Doe") :: Nil)
val query3 = sql("select name.first from contacts " +
"where employer.company.name = 'abc' and p = 1")
checkScan(query3, "struct<name:struct<first:string>," +
"employer:struct<company:struct<name:string>>>")
checkAnswer(query3, Row("Jane") :: Nil)
val query4 = sql("select name.first, employer.company.name from contacts " +
"where employer.company is not null and p = 1")
checkScan(query4, "struct<name:struct<first:string>," +
"employer:struct<company:struct<name:string>>>")
checkAnswer(query4, Row("Jane", "abc") :: Nil)
}
testSchemaPruning("select nullable complex field and having is not null predicate") {
val query = sql("select employer.company from contacts " +
"where employer is not null and p = 1")
checkScan(query, "struct<employer:struct<company:struct<name:string,address:string>>>")
checkAnswer(query, Row(Row("abc", "123 Business Street")) :: Row(null) :: Nil)
}
testSchemaPruning("select a single complex field and is null expression in project") {
val query = sql("select name.first, address is not null from contacts")
checkScan(query, "struct<name:struct<first:string>,address:string>")
checkAnswer(query.orderBy("id"),
Row("Jane", true) :: Row("John", true) :: Row("Janet", true) :: Row("Jim", true) :: Nil)
}
testSchemaPruning("select a single complex field array and in clause") {
val query = sql("select friends.middle from contacts where friends.first[0] = 'Susan'")
checkScan(query,
"struct<friends:array<struct<first:string,middle:string>>>")
checkAnswer(query.orderBy("id"),
Row(Array("Z.")) :: Nil)
}
testSchemaPruning("select a single complex field from a map entry and in clause") {
val query =
sql("select relatives[\"brother\"].middle from contacts " +
"where relatives[\"brother\"].first = 'John'")
checkScan(query,
"struct<relatives:map<string,struct<first:string,middle:string>>>")
checkAnswer(query.orderBy("id"),
Row("Y.") :: Nil)
}
private def testSchemaPruning(testName: String)(testThunk: => Unit) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
test(s"Spark vectorized reader - without partition data column - $testName") {
withContacts(testThunk)
}
test(s"Spark vectorized reader - with partition data column - $testName") {
withContactsWithDataPartitionColumn(testThunk)
}
}
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
test(s"Parquet-mr reader - without partition data column - $testName") {
withContacts(testThunk)
}
test(s"Parquet-mr reader - with partition data column - $testName") {
withContactsWithDataPartitionColumn(testThunk)
}
}
}
private def withContacts(testThunk: => Unit) {
withTempPath { dir =>
val path = dir.getCanonicalPath
makeParquetFile(contacts, new File(path + "/contacts/p=1"))
makeParquetFile(briefContacts, new File(path + "/contacts/p=2"))
spark.read.parquet(path + "/contacts").createOrReplaceTempView("contacts")
testThunk
}
}
private def withContactsWithDataPartitionColumn(testThunk: => Unit) {
withTempPath { dir =>
val path = dir.getCanonicalPath
makeParquetFile(contactsWithDataPartitionColumn, new File(path + "/contacts/p=1"))
makeParquetFile(briefContactsWithDataPartitionColumn, new File(path + "/contacts/p=2"))
spark.read.parquet(path + "/contacts").createOrReplaceTempView("contacts")
testThunk
}
}
case class MixedCaseColumn(a: String, B: Int)
case class MixedCase(id: Int, CoL1: String, coL2: MixedCaseColumn)
private val mixedCaseData =
MixedCase(0, "r0c1", MixedCaseColumn("abc", 1)) ::
MixedCase(1, "r1c1", MixedCaseColumn("123", 2)) ::
Nil
testMixedCasePruning("select with exact column names") {
val query = sql("select CoL1, coL2.B from mixedcase")
checkScan(query, "struct<CoL1:string,coL2:struct<B:int>>")
checkAnswer(query.orderBy("id"),
Row("r0c1", 1) ::
Row("r1c1", 2) ::
Nil)
}
testMixedCasePruning("select with lowercase column names") {
val query = sql("select col1, col2.b from mixedcase")
checkScan(query, "struct<CoL1:string,coL2:struct<B:int>>")
checkAnswer(query.orderBy("id"),
Row("r0c1", 1) ::
Row("r1c1", 2) ::
Nil)
}
testMixedCasePruning("select with different-case column names") {
val query = sql("select cOL1, cOl2.b from mixedcase")
checkScan(query, "struct<CoL1:string,coL2:struct<B:int>>")
checkAnswer(query.orderBy("id"),
Row("r0c1", 1) ::
Row("r1c1", 2) ::
Nil)
}
testMixedCasePruning("filter with different-case column names") {
val query = sql("select id from mixedcase where Col2.b = 2")
checkScan(query, "struct<id:int,coL2:struct<B:int>>")
checkAnswer(query.orderBy("id"), Row(1) :: Nil)
}
private def testMixedCasePruning(testName: String)(testThunk: => Unit) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
SQLConf.CASE_SENSITIVE.key -> "true") {
test(s"Spark vectorized reader - case-sensitive parser - mixed-case schema - $testName") {
withMixedCaseData(testThunk)
}
}
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
SQLConf.CASE_SENSITIVE.key -> "false") {
test(s"Parquet-mr reader - case-insensitive parser - mixed-case schema - $testName") {
withMixedCaseData(testThunk)
}
}
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true",
SQLConf.CASE_SENSITIVE.key -> "false") {
test(s"Spark vectorized reader - case-insensitive parser - mixed-case schema - $testName") {
withMixedCaseData(testThunk)
}
}
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
SQLConf.CASE_SENSITIVE.key -> "true") {
test(s"Parquet-mr reader - case-sensitive parser - mixed-case schema - $testName") {
withMixedCaseData(testThunk)
}
}
}
private def withMixedCaseData(testThunk: => Unit) {
withParquetTable(mixedCaseData, "mixedcase") {
testThunk
}
}
private val schemaEquality = new Equality[StructType] {
override def areEqual(a: StructType, b: Any): Boolean =
b match {
case otherType: StructType => a.sameType(otherType)
case _ => false
}
}
protected def checkScan(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = {
checkScanSchemata(df, expectedSchemaCatalogStrings: _*)
// We check here that we can execute the query without throwing an exception. The results
// themselves are irrelevant, and should be checked elsewhere as needed
df.collect()
}
private def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = {
val fileSourceScanSchemata =
df.queryExecution.executedPlan.collect {
case scan: FileSourceScanExec => scan.requiredSchema
}
assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " +
s"but expected $expectedSchemaCatalogStrings")
fileSourceScanSchemata.zip(expectedSchemaCatalogStrings).foreach {
case (scanSchema, expectedScanSchemaCatalogString) =>
val expectedScanSchema = CatalystSqlParser.parseDataType(expectedScanSchemaCatalogString)
implicit val equality = schemaEquality
assert(scanSchema === expectedScanSchema)
}
}
}