-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27034][SQL] Nested schema pruning for ORC #23943
Conversation
Test build #102949 has finished for PR 23943 at commit
|
retest this please |
Test build #102950 has finished for PR 23943 at commit
|
Test build #102953 has finished for PR 23943 at commit
|
Test build #102960 has finished for PR 23943 at commit
|
Test build #102979 has finished for PR 23943 at commit
|
@@ -81,7 +82,8 @@ object ParquetSchemaPruning extends Rule[LogicalPlan] { | |||
* Checks to see if the given relation is Parquet and can be pruned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParquetSchemaPruning
can be renamed to like SchemaPruning
, but I leave it to later followup to reduce diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After finishing reviews, we can rename it as a final commit.
Row(null, null) :: | ||
Nil) | ||
} | ||
|
||
testSchemaPruning("select a single complex field and the partition column") { | ||
val query = sql("select name.middle, p from contacts") | ||
checkScan(query, "struct<name:struct<middle:string>>") | ||
checkAnswer(query.orderBy("id"), | ||
Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those tests can't be shared with ORC, because they are depended on schema merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, do you mean spark.sql.parquet.mergeSchema
is enabled in this test suite? I guess it's disabled by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry, it is not due to schema merging.
But the inferred schema between ORC and Parquet is different. We can test it on current master branch like:
withTempPath { dir =>
val path = dir.getCanonicalPath
makeDataSourceFile(contacts, new File(path + "/contacts/p=1"))
makeDataSourceFile(briefContacts, new File(path + "/contacts/p=2"))
spark.read.format(dataSourceName).load(path + "/contacts").createOrReplaceTempView("contacts")
spark.sql("select * from contacts").printSchema()
}
When dataSourceName
is parquet, the schema is:
root
|-- id: integer (nullable = true)
|-- name: struct (nullable = true)
| |-- first: string (nullable = true)
| |-- middle: string (nullable = true)
| |-- last: string (nullable = true)
|-- address: string (nullable = true)
|-- pets: integer (nullable = true)
|-- friends: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- first: string (nullable = true)
| | |-- middle: string (nullable = true)
| | |-- last: string (nullable = true)
|-- relatives: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- first: string (nullable = true)
| | |-- middle: string (nullable = true)
| | |-- last: string (nullable = true)
|-- employer: struct (nullable = true)
| |-- id: integer (nullable = true)
| |-- company: struct (nullable = true)
| | |-- name: string (nullable = true)
| | |-- address: string (nullable = true)
|-- p: integer (nullable = true)
For orc, it is:
root
|-- id: integer (nullable = true)
|-- name: struct (nullable = true)
| |-- first: string (nullable = true)
| |-- last: string (nullable = true)
|-- address: string (nullable = true)
|-- p: integer (nullable = true)
I think this is ok for review now. cc @dongjoon-hyun @cloud-fan @dbtsai |
@dongjoon-hyun Thanks. Updated benchmark result after rebased with the master. |
Test build #103074 has finished for PR 23943 at commit
|
Top-level column 113 196 89 8.8 113.0 1.0X | ||
Nested column 1316 1639 240 0.8 1315.5 0.1X | ||
Top-level column 116 151 36 8.6 116.3 1.0X | ||
Nested column 544 604 31 1.8 544.5 0.2X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, 2x faster than before. BTW, Parquet has a ratio 10:4 and ORC 10:2 now. In other words, nested column read is slower 2 times in Parquet and 5 times in ORC. I guess there still exists some overhead in this PR (compared with Parquet). Could you optimize more in the current approach?
PARQUET
Selection: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Top-level column 88 114 16 11.4 87.5 1.0X
Nested column 201 223 27 5.0 200.5 0.4X
cc @cloud-fan , @gengliangwang , @gatorsmile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the read performance is more determined by persistent library than Spark side here. As Parquet, at Spark side we provide correctly pruned nested schema to ORC library. Pruning nested fields when reading data is done by ORC library. I'm not sure if we have much room to optimize at Spark side for this. Of course I'm open to any suggestion I'm missing right now.
@@ -1540,8 +1540,8 @@ object SQLConf { | |||
.internal() | |||
.doc("Prune nested fields from a logical relation's output which are unnecessary in " + | |||
"satisfying a query. This optimization allows columnar file format readers to avoid " + | |||
"reading unnecessary nested column data. Currently Parquet is the only data source that " + | |||
"implements this optimization.") | |||
"reading unnecessary nested column data. Currently Parquet and ORC are the data sources " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we specify ORC v1
instead of ORC
because ORC
means V2 by default in Spark 3.0.0.
import org.apache.spark.sql.types.StructType | ||
|
||
abstract class SchemaPruningSuite | ||
extends QueryTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation? extends
uses two space indent.
checkScan(query, "struct<name:struct<first:string,middle:string,last:string>>") | ||
checkAnswer(query.orderBy("id"), | ||
Row("X.", Row("Jane", "X.", "Doe")) :: | ||
Row("Y.", Row("John", "Y.", "Doe")) :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need more spaces here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Automatically edited by the IDE...
"relatives:map<string,struct<first:string,middle:string,last:string>>>") | ||
checkAnswer(query.orderBy("id"), | ||
Row(0, "Doe", "X.", "Jane", null, null, null, "Smith", "Z.", "Susan", 1, "123 Main Street") :: | ||
Row(1, "Doe", "Y.", "John", null, null, null, null, null, null, 3, "321 Wall Street") :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
} | ||
|
||
/** | ||
* Overrides this because ORC datasource doesn't support schema merging currently. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you gave the final schema as the user defined schema, it will work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tried. But I guess with user specified schema, it should work.
briefContacts.map { case BriefContact(id, name, address) => | ||
BriefContactWithDataPartitionColumn(id, name, address, 2) } | ||
|
||
testSchemaPruning("select a single complex field array and its parent struct array") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are all the same test cases, aren't these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should be able to use all same tests between Parquet and ORC once user specified schema works.
Test build #103121 has finished for PR 23943 at commit
|
retest this please |
Test build #103127 has finished for PR 23943 at commit
|
@dongjoon-hyun Thanks for your review! By using user specified schema, Parquet and ORC schema pruning suite use the same test cases now. |
Test build #103258 has finished for PR 23943 at commit
|
Retest this please. |
Test build #103286 has finished for PR 23943 at commit
|
@@ -206,7 +209,7 @@ class OrcFileFormat | |||
Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) | |||
batchReader.initialize(fileSplit, taskAttemptContext) | |||
batchReader.initBatch( | |||
reader.getSchema, | |||
TypeDescription.fromString(resultSchemaString), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is required because we select the subset of the file schema (=reader.getSchema
).
Could you rebase to |
@dongjoon-hyun Rebased now. Thanks! |
Test build #103309 has finished for PR 23943 at commit
|
idx | ||
} else { | ||
-1 | ||
} | ||
}) | ||
} else { | ||
// Do case-insensitive resolution only if in case-insensitive mode | ||
val caseInsensitiveOrcFieldMap = | ||
orcFieldNames.zipWithIndex.groupBy(_._1.toLowerCase(Locale.ROOT)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we don't need the old index, shall we remove the obsolete indexes?
- val caseInsensitiveOrcFieldMap =
- orcFieldNames.zipWithIndex.groupBy(_._1.toLowerCase(Locale.ROOT))
+ val caseInsensitiveOrcFieldMap = orcFieldNames.groupBy(_.toLowerCase(Locale.ROOT))
Some(requiredSchema.fieldNames.zipWithIndex.map { case (requiredFieldName, idx) =>
caseInsensitiveOrcFieldMap
.get(requiredFieldName.toLowerCase(Locale.ROOT))
.map { matchedOrcFields =>
if (matchedOrcFields.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is matched.
- val matchedOrcFieldsString = matchedOrcFields.map(_._1).mkString("[", ", ", "]")
+ val matchedOrcFieldsString = matchedOrcFields.mkString("[", ", ", "]")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
Test build #103379 has finished for PR 23943 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Merged to master. Thank you, @viirya .
cc @dbtsai , @cloud-fan , @gatorsmile , @gengliangwang
@viirya . Since this is merged now, could you make a planned followup PR with renaming |
…ning ## What changes were proposed in this pull request? This is a followup to #23943. This proposes to rename ParquetSchemaPruning to SchemaPruning as ParquetSchemaPruning supports both Parquet and ORC v1 now. ## How was this patch tested? Existing tests. Closes #24077 from viirya/nested-schema-pruning-orc-followup. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…g BM result with EC2 ## What changes were proposed in this pull request? This is a follow up PR for #23943 in order to update the benchmark result with EC2 `r3.xlarge` instance. ## How was this patch tested? N/A. (Manually compare the diff) Closes #24078 from dongjoon-hyun/SPARK-27034. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
"reading unnecessary nested column data. Currently Parquet is the only data source that " + | ||
"implements this optimization.") | ||
"reading unnecessary nested column data. Currently Parquet and ORC v1 are the " + | ||
"data sources that implement this optimization.") | ||
.booleanConf | ||
.createWithDefault(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dbtsai @dongjoon-hyun We turned on this flag by default in the upcoming 3.0 because Apple has tried this in the production in the last few months. I am wondering if that statement also includes ORC nested schema pruning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We mainly use it and test it with Parquet.
What changes were proposed in this pull request?
We only supported nested schema pruning for Parquet previously. This proposes to support nested schema pruning for ORC too.
Note: This only covers ORC v1. For ORC v2, the necessary change is at the schema pruning rule. We should deal with ORC v2 as a TODO item, in order to reduce review burden.
How was this patch tested?
Added tests.