New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32792][SQL] Improve Parquet In filter pushdown #29642
Conversation
Test build #128266 has finished for PR 29642 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
Show resolved
Hide resolved
@wangyum Do you have any further comments? If not, shall we close this one? |
Kubernetes integration test starting |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
Show resolved
Hide resolved
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #130244 has finished for PR 29642 at commit
|
Test build #130245 has finished for PR 29642 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #131236 has finished for PR 29642 at commit
|
case Some(dataType) => | ||
val sortedValues = values.sorted(TypeUtils.getInterpretedOrdering(dataType)) | ||
createFilterHelper( | ||
sources.And(sources.GreaterThanOrEqual(name, sortedValues.head), | ||
sources.LessThanOrEqual(name, sortedValues.last)), | ||
canPartialPushDownConjuncts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic is same to HiveShim.scala#L746-L750.
spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
Lines 746 to 750 in 09bb9be
case InSet(child, values) if useAdvanced && values.size > inSetThreshold => | |
val dataType = child.dataType | |
val sortedValues = values.toSeq.sorted(TypeUtils.getInterpretedOrdering(dataType)) | |
convert(And(GreaterThanOrEqual(child, Literal(sortedValues.head, dataType)), | |
LessThanOrEqual(child, Literal(sortedValues.last, dataType)))) |
@cloud-fan @dongjoon-hyun @HyukjinKwon It can be improved by 6.6X in InSet -> InFilters (values count: 100, distribution: 10)
:
Parquet Vectorized (Pushdown) 9520 9560 27 1.7 605.3 1.0X
Parquet Vectorized (Pushdown) 873 885 11 18.0 55.5 6.6X
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, then can we turn it into a util method and use it in all the filter pushdown place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, Added a new function to TypeUtils
.
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #131289 has finished for PR 29642 at commit
|
shall we implement the logic in |
It seems only Parquet not well supported This is the benchmark of CSV: val rowsNum = 100 * 1000
val numIters = 3
val colsNum = 100
val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", TimestampType))
val schema = StructType(StructField("key", IntegerType) +: fields)
def columns(): Seq[Column] = {
val ts = Seq.tabulate(colsNum) { i =>
lit(Instant.ofEpochSecond(i * 12345678)).as(s"col$i")
}
($"id" % 1000).as("key") +: ts
}
withTempPath { path =>
spark.range(rowsNum).select(columns(): _*)
.write.option("header", true)
.csv(path.getAbsolutePath)
def readback = {
spark.read
.option("header", true)
.schema(schema)
.csv(path.getAbsolutePath)
}
def withFilter(filer: String, configEnabled: Boolean): Unit = {
withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> configEnabled.toString()) {
readback.filter(filer).noop()
}
}
Seq(5, 10, 50, 100, 500).foreach { count =>
Seq(10, 50).foreach { distribution =>
val title = s"InSet -> InFilters (values count: $count, distribution: $distribution)"
val benchmark = new Benchmark(title, rowsNum, output = output)
Seq(false, true).foreach { pushDownEnabled =>
val name = s"Native CSV Vectorized ${if (pushDownEnabled) s"(Pushdown)" else ""}"
benchmark.addCase(name, numIters) { _ =>
val filter =
Range(0, count).map(_ => scala.util.Random.nextInt(rowsNum * distribution / 100))
val whereExpr = s"key in(${filter.mkString(",")})"
withFilter(whereExpr, configEnabled = pushDownEnabled)
}
}
benchmark.run()
}
}
} Result:
|
makeEq.lift(nameToParquetField(name).fieldType) | ||
.map(_(nameToParquetField(name).fieldNames, v)) | ||
}.reduceLeftOption(FilterApi.or) | ||
case sources.In(name, values) if pushDownInFilterThreshold > 0 && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangyum, the impala reference sounds good. Can we make it general and push the range filter to other data sources as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is supposed to be beneficial in other sources as well, I think it makes more sense to push it to other sources as well anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems only Parquet is not well supported In
predicate pushdown.
Parquet vs ORC:
spark/sql/core/benchmarks/FilterPushdownBenchmark-results.txt
Lines 439 to 482 in f5118f8
OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws | |
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | |
InSet -> InFilters (values count: 50, distribution: 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | |
------------------------------------------------------------------------------------------------------------------------ | |
Parquet Vectorized 9281 9298 12 1.7 590.1 1.0X | |
Parquet Vectorized (Pushdown) 9546 9561 17 1.6 606.9 1.0X | |
Native ORC Vectorized 6877 6897 18 2.3 437.2 1.3X | |
Native ORC Vectorized (Pushdown) 661 668 15 23.8 42.0 14.0X | |
OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws | |
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | |
InSet -> InFilters (values count: 50, distribution: 90): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | |
------------------------------------------------------------------------------------------------------------------------ | |
Parquet Vectorized 9322 9335 22 1.7 592.7 1.0X | |
Parquet Vectorized (Pushdown) 9551 9573 18 1.6 607.2 1.0X | |
Native ORC Vectorized 6902 6915 13 2.3 438.8 1.4X | |
Native ORC Vectorized (Pushdown) 659 680 25 23.9 41.9 14.1X | |
OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws | |
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | |
InSet -> InFilters (values count: 100, distribution: 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | |
------------------------------------------------------------------------------------------------------------------------ | |
Parquet Vectorized 9278 9294 18 1.7 589.9 1.0X | |
Parquet Vectorized (Pushdown) 9520 9560 27 1.7 605.3 1.0X | |
Native ORC Vectorized 6855 6870 16 2.3 435.9 1.4X | |
Native ORC Vectorized (Pushdown) 795 808 16 19.8 50.5 11.7X | |
OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws | |
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | |
InSet -> InFilters (values count: 100, distribution: 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | |
------------------------------------------------------------------------------------------------------------------------ | |
Parquet Vectorized 9306 9311 4 1.7 591.6 1.0X | |
Parquet Vectorized (Pushdown) 9529 9551 16 1.7 605.8 1.0X | |
Native ORC Vectorized 6875 6882 7 2.3 437.1 1.4X | |
Native ORC Vectorized (Pushdown) 853 865 15 18.4 54.2 10.9X | |
OpenJDK 64-Bit Server VM 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09 on Linux 4.15.0-1044-aws | |
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | |
InSet -> InFilters (values count: 100, distribution: 90): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | |
------------------------------------------------------------------------------------------------------------------------ | |
Parquet Vectorized 9256 9271 9 1.7 588.5 1.0X | |
Parquet Vectorized (Pushdown) 9500 9520 13 1.7 604.0 1.0X | |
Native ORC Vectorized 6843 6857 9 2.3 435.1 1.4X | |
Native ORC Vectorized (Pushdown) 858 870 14 18.3 54.6 10.8X |
CSV:
#29642 (comment)
@@ -704,8 +704,8 @@ object SQLConf { | |||
val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD = | |||
buildConf("spark.sql.parquet.pushdown.inFilterThreshold") | |||
.doc("The maximum number of values to filter push-down optimization for IN predicate. " + | |||
"Large threshold won't necessarily provide much better performance. " + | |||
"The experiment argued that 300 is the limit threshold. " + | |||
"Spark will push-down a value greater than or equal to its minimum value and " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the default value 10
is small here. What is the default threshold in IMPLA?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Impala only optimize it to >= minimum value
and <= maximum value
: apache/impala@aa05c64
Parquet Vectorized 10287 10449 144 1.5 654.0 1.0X | ||
Parquet Vectorized (Pushdown) 467 494 20 33.7 29.7 22.0X | ||
Native ORC Vectorized 6781 6848 58 2.3 431.1 1.5X | ||
Native ORC Vectorized (Pushdown) 428 440 10 36.8 27.2 24.1X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. 17 vs 17
-> 22 vs 24
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Github action runs on different machines, there is a performance difference between them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the benchmark result, I'm a little confused if this PR is improvement or not. Could you add some explanation about the improvement part, @wangyum ? Maybe, is it affected by the master branch instead of this PR?
Also, cc @huaxingao since this is Parquet filter pushdown.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138307 has finished for PR 29642 at commit
|
@dongjoon-hyun This pr only improve the |
@dongjoon-hyun Do you have more comments? |
No, @wangyum . I'm meaning the ratio between ORC and Parquet on the same machine run. Previously, ORC and Parquet shows the similar performance but now Parquet looks like more slower than ORC after this PR by increasing the gap. For example, the following.
Although the values are too small, this generate result shows a slowdown of Parquet compared with ORC. That was my question. |
@dongjoon-hyun I think this performance issue is not caused by this change. This PR only changes the
|
Yea these benchmark results are not updated in time. Let's post the benchmark result before and after this PR in the PR description. |
@dongjoon-hyun @cloud-fan Please see the latest benchmark result: 27a2bf6 |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138557 has finished for PR 29642 at commit
|
Thank you for updating, @wangyum . At the last commit, yes, I agree that it looks like there is no regression by this PR. One last question: could you spot what is the improvement in the the last commit by this PR? It's not clear to me in the last commit. Do we need to add some specific additional benchmark case for your contribution? |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #138582 has finished for PR 29642 at commit
|
@dongjoon-hyun I think current benchmark is enough. I have updated the benchmark to PR description. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you so much, @wangyum and all!
Merged to master.
cc @aokolnychyi
uhoh .. seems like there's a logical conflict with #31776:
|
@wangyum are you online? can you take a quick look and fix or revert? |
Oops. |
### What changes were proposed in this pull request? This fixes the compilation error due to the logical conflicts between #31776 and #29642 . ### Why are the changes needed? To recover compilation. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Closes #32568 from wangyum/HOT-FIX. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
Support push down
GreaterThanOrEqual
minimum value andLessThanOrEqual
maximum value for Parquet when sources.In's values exceedsspark.sql.optimizer.inSetRewriteMinMaxThreshold
. For example:We will push down
id >= 1 and id <= 15
.Impala also has this improvement: https://issues.apache.org/jira/browse/IMPALA-3654
Why are the changes needed?
Improve query performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test, manual test and benchmark test.
Before this PR:
After this PR: